package fn

import "sync"

/*
 * 共享channel 消费，自定义分发策略
 */

// Split 1->N，分发一个channel到多个channel， 轮询分发
func Split(ch <-chan int, n int) []chan int {
	cs := make([]chan int, 0)
	for i := 0; i < n; i++ {
		cs = append(cs, make(chan int))
	}

	distributeToChannels := func(ch <-chan int, cs []chan int) {
		// 转发结束时关闭全部输出通道
		defer func(cs []chan int) {
			for _, c := range cs {
				close(c)
			}
		}(cs)

		// 轮询分发
		i := -1
		for val := range ch {
			i = (i + 1) % len(cs)
			cs[i] <- val
		}
	}

	go distributeToChannels(ch, cs)

	return cs
}

// Split 1->N，分发一个channel到多个channel， 指定策略分发
type SPLIT_FUNC func(data int) int

func SplitF(ch <-chan int, n int, f SPLIT_FUNC) []chan int {
	cs := make([]chan int, 0)
	for i := 0; i < n; i++ {
		cs = append(cs, make(chan int))
	}

	distributeToChannels := func(ch <-chan int, cs []chan int) {
		// 转发结束时关闭全部输出通道
		defer func(cs []chan int) {
			for _, c := range cs {
				close(c)
			}
		}(cs)

		// 轮询分发
		i := 0
		for val := range ch {
			i = f(val)
			if i < 0 || i >= len(cs) {
				panic("Split function error!")
			}

			cs[i] <- val
		}
	}

	go distributeToChannels(ch, cs)

	return cs
}

// Clone 1->N，复制一个channel到多个channel
func Clone(ch <-chan int, n int) []chan int {
	cs := make([]chan int, 0)
	for i := 0; i < n; i++ {
		cs = append(cs, make(chan int))
	}

	distributeToChannels := func(ch <-chan int, cs []chan int) {
		// 转发结束时关闭全部输出通道
		defer func(cs []chan int) {
			for _, c := range cs {
				close(c)
			}
		}(cs)

		// 输出的每个通道都【复制】一份数据
		for val := range ch {
			for _, c := range cs {
				c <- val
			}
		}
	}

	go distributeToChannels(ch, cs)

	return cs
}

/*
 * channel 合并
 */

// N->1，合并多个channel到一个channel， Fan-in实现
func Merge(cs []chan int) <-chan int {
	var wg sync.WaitGroup
	out := make(chan int)

	// 拷贝每个通道输出到统一输出out channel
	output := func(c <-chan int) {
		defer wg.Done()
		for n := range c {
			out <- n
		}
	}

	wg.Add(len(cs))
	for _, c := range cs {
		go output(c)
	}

	// 起监控, 确保所有输出合并完成后关闭输出out channel
	go func() {
		wg.Wait()
		close(out)
	}()

	return out
}
